#include "config.h"

#include <dirent.h>
#include <sys/types.h>
#include <regex.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include <signal.h>
#include <sys/vfs.h>
#include <unistd.h>
#include <getopt.h>
#include <sys/statvfs.h>

#define DBG_SUBSYS S_LIBCLUSTER

#include "sysy_lib.h"
#include "cluster.h"
#include "env.h"
#include "configure.h"
#include "lichconf.h"
#include "job_dock.h"
#include "rpc_proto.h"
#include "metadata.h"
#include "ynet_rpc.h"
#include "net_global.h"
#include "cluster_rpc.h"
#include "longtask.h"
#include "node.h"
#include "etcd.h"
#include "ylog.h"
#include "dbg.h"

static int __cluster_countnode(int *count)
{
        int ret;
        etcd_node_t *list = NULL;

        ret = etcd_list(ETCD_NODE, &list);
        if (unlikely(ret)) {
                if (ret == ENOKEY) {
                        DINFO("node table empty\n");
                        *count = 0;
                        goto out;
                } else
                        GOTO(err_ret, ret);
        }

        *count = list->num_node;
        free_etcd_node(list);

out:
        return 0;
err_ret:
        return ret;
}

int cluster_countnode(int *count)
{
        int ret, retry0 = 0;
        time_t now;
        static time_t pre_load = 0;
        static int node_count = 0;

        now = gettime();
        if (pre_load == 0 || (now - pre_load) > 10) {
        retry:
                ret = __cluster_countnode(&node_count);
                if (ret) {
                        if (ret == EAGAIN || ret == ENONET) {
                                USLEEP_RETRY(err_ret, ret, retry, retry0, 30, (1000*1000));
                        } else {
                                GOTO(err_ret, ret);
                        }
                }
                pre_load = now;
        }

        *count = node_count;
        return 0;
err_ret:
        return ret;
}

int cluster_is_solomode()
{
        int ret;
        int node_count = 0;
        static int pre_solomode = 1;

        if (!gloconf.solomode)
                return 0;

        if (!pre_solomode)
                return 0;

        ret = cluster_countnode(&node_count);
        if (ret) {
                DERROR("cluster count node fail. ret:%d, errmsg:%s\n", ret, strerror(ret));
                GOTO(err_ret, ret);
        }

        DBUG("solomode %d pre_solomode %d node_count %d\n", gloconf.solomode, pre_solomode, node_count);

        if (node_count == 1) {
                pre_solomode = 1;
                return 1;
        } else {
                pre_solomode = 0;
                return 0;
        }

err_ret:
        return 0;
}

int cluster_listnode_open(const char *uuid)
{
        int ret;
        char buf[MAX_BUF_LEN];

        ret = maping_getmaster(buf, 0);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        ret = node_getlist_open(buf, uuid);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int cluster_listnode(char *_buf, int *len, const char *uuid, uint64_t offset)
{
        int ret;
        char buf[MAX_BUF_LEN];

        ret = maping_getmaster(buf, 0);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        ret = node_getlist(buf, _buf, len, uuid, offset);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int cluster_listnode_close(const char *uuid)
{
        int ret;
        char buf[MAX_BUF_LEN];

        ret = maping_getmaster(buf, 0);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        ret = node_getlist_close(buf, uuid);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

#if 1
int cluster_listnode_iterator(func_int1_t func, void *arg)
{
        int ret, i;
        etcd_node_t *list = NULL, *node;

        ret = etcd_list(ETCD_NODE, &list);
        if (unlikely(ret)) {
                if (ret == ENOKEY) {
                        DINFO("node table empty\n");
                        goto out;
                } else
                        GOTO(err_ret, ret);
        }

        for(i = 0; i < list->num_node; i++) {
                node = list->nodes[i];
                
                ret = func(node->key, arg);
                if (unlikely(ret))
                        GOTO(err_free, ret);

        }

        free_etcd_node(list);
        
out:
        return 0;
err_free:
        free_etcd_node(list);
err_ret:
        return ret;
}

int cluster_listnode_iterator1(func_int1_t func, void *arg)
{
        int ret, i, retry = 0;
        char tmp[MAX_BUF_LEN];
        etcd_node_t *list = NULL, *node;
        nid_t nid;

        ret = etcd_list(ETCD_NODE, &list);
        if (unlikely(ret)) {
                if (ret == ENOKEY) {
                        DINFO("node table empty\n");
                        goto out;
                } else
                        GOTO(err_ret, ret);
        }

        for(i = 0; i < list->num_node; i++) {
                node = list->nodes[i];

                retry = 0;
        retry:
                ret = etcd_get_text(ETCD_NODE, node->key, tmp, NULL);
                if (unlikely(ret)) {
                        DERROR("%s not found\n", node->key);
                        USLEEP_RETRY(err_free, ret, retry, retry, 30, (1000*100));
                }

                str2nid(&nid, tmp);

                ret = func(&nid, arg);
                if (unlikely(ret))
                        GOTO(err_free, ret);

        }

        free_etcd_node(list);

out:
        return 0;
err_free:
        free_etcd_node(list);
err_ret:
        return ret;
}

#else

int cluster_listnode_iterator(func_int1_t func, void *arg)
{
        int ret, buflen, done = 0;
        char buf[MAX_BUF_LEN];
        uint64_t offset = 0, offset2 = 0;
        struct dirent *de;

        uuid_t _uuid;
        char uuid[MAX_NAME_LEN] = {};

        uuid_generate(_uuid);
        uuid_unparse(_uuid, uuid);

        ret = cluster_listnode_open(uuid);
        if (ret)
                GOTO(err_ret, ret);

        while (done == 0) {
                memset(buf, 0, sizeof(buf));
                ret = cluster_listnode(buf, &buflen, uuid, offset);
                if (ret)
                        GOTO(err_close, ret);

                if (buflen == 0)
                        break;

                offset2 = 0;
                dir_for_each(buf, buflen, de, offset2) {
                        if (strlen(de->d_name) == 0) {
                                done = 1;
                                break;
                        } else if (buflen - offset2 < sizeof(*de) + MAX_NAME_LEN)
                                break;

                        offset += de->d_reclen;

                        ret = func(de->d_name, arg);
                        if (unlikely(ret))
                                GOTO(err_close, ret);

                }
        }

        cluster_listnode_close(uuid);
        return 0;
err_close:
        cluster_listnode_close(uuid);
err_ret:
        return ret;
}

#endif

int cluster_get_nodes(vec_node_t *nodes, int writeable)
{
        int i, ret, buflen, exist, done = 0;
        char buf[MAX_BUF_LEN];
        uint64_t offset = 0, offset2 = 0;
        struct dirent *de;
        nodeinfo_t *info;
        cluster_node_t *node, *val;
        uuid_t _uuid;
        char uuid[MAX_NAME_LEN] = {};

        uuid_generate(_uuid);
        uuid_unparse(_uuid, uuid);

        ret = cluster_listnode_open(uuid);
        if (ret)
                GOTO(err_ret, ret);

        while (done == 0) {
                memset(buf, 0, sizeof(buf));
                ret = cluster_listnode(buf, &buflen, uuid, offset);
                if (ret)
                        GOTO(err_close, ret);

                if (buflen == 0)
                        break;

                offset2 = 0;
                dir_for_each(buf, buflen, de, offset2) {
                        if (strlen(de->d_name) == 0) {
                                done = 1;
                                break;
                        } else if (buflen - offset2 < sizeof(*de) + MAX_NAME_LEN)
                                break;

                        offset += de->d_reclen;

                        ret = cluster_node_malloc(&node);
                        if (unlikely(ret))
                                GOTO(err_close, ret);

                        ret = node_getinfo(&node->nodeinfo, de->d_name, node->tmp);
                        if (ret) {
                                cluster_node_free(&node);
                                continue;
                        }

                        if (writeable && !node_stat_writeable(node->nodeinfo.stat->status)) {
                                cluster_node_free(&node);
                                continue;
                        }

                        exist = 0;
                        vec_foreach(nodes, val, i) {
                                info = &val->nodeinfo;
                                if (0 == nid_cmp(&info->stat->nid, &node->nodeinfo.stat->nid)) {
                                        exist = 1;
                                        DERROR("cluster get node list err! nid "NID_FORMAT" same!!!\n",
                                                        NID_ARG(&info->stat->nid));
                                        break;
                                }
                        }
                        if (0 == exist) vec_push(nodes, node);
                }
        }

        cluster_listnode_close(uuid);
        return 0;
err_close:
        cluster_listnode_close(uuid);
err_ret:
        return ret;
}

static int __cluster_countnode_writeable(int *_count)
{
        int ret, buflen, count = 0, done = 0;
        char buf[MAX_BUF_LEN], tmp[MAX_BUF_LEN];
        uint64_t offset = 0, offset2 = 0;
        struct dirent *de;
        nodeinfo_t info;
        uuid_t _uuid;
        char uuid[MAX_NAME_LEN] = {};

        uuid_generate(_uuid);
        uuid_unparse(_uuid, uuid);

        ret = cluster_listnode_open(uuid);
        if (ret)
                GOTO(err_ret, ret);

        while (done == 0) {
                memset(buf, 0, sizeof(buf));
                ret = cluster_listnode(buf, &buflen, uuid, offset);
                if (ret)
                        GOTO(err_close, ret);

                if (buflen == 0)
                        break;

                offset2 = 0;
                dir_for_each(buf, buflen, de, offset2) {
                        if (strlen(de->d_name) == 0) {
                                done = 1;
                                break;
                        } else if (buflen - offset2 < sizeof(*de) + MAX_NAME_LEN)
                                break;

                        offset += de->d_reclen;

                        ret = node_getinfo(&info, de->d_name, tmp);
                        if (ret) {
                                continue;
                        }

                        if (info.stat->status & __NODE_STAT_DELETING__) {
                                continue;
                        }

                        if (!(info.stat->status & __NODE_STAT_WRITEABLE__)) {
                                continue;
                        }

                        count++;
                }
        }

        *_count = count;
        cluster_listnode_close(uuid);

        return 0;
err_close:
        cluster_listnode_close(uuid);
err_ret:
        return ret;
}

int cluster_countnode_writeable(int *_count)
{
        int ret;
        time_t now;
        static time_t pre = 0;
        static int count = 0;

        now = gettime();
        if (pre == 0 || (now - pre) > 30) {
                ret = __cluster_countnode_writeable(&count);
                if (ret) {
                        GOTO(err_ret, ret);
                }
                pre = now;
        }

        *_count = count;
        return 0;
err_ret:
        return ret;
}

int cluster_ready()
{
#if 1
        int ret;

        ret = network_connect_master();
        if (ret)
                return 0;
        else
                return 1;
        
#else
        int ret;
        nid_t admin;

        admin = *net_getadmin();

        if (net_isnull(&admin))
                return 0;

        if (net_islocal(&admin)) {
        } else {
                ret = network_connect_master();
                if (ret == 0)
                        return 1;
                else
                        return 0;
        }

        return 1;
#endif
}

int cluster_dropnode(const char **list, int count)
{
        int ret, i;
        char buf[MAX_BUF_LEN];

        ret = maping_getmaster(buf, 0);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        YASSERT(list);

        for (i = 0; i < count; i++) {
                DBUG("list %s\n", list[i]);
                ret = node_remove(buf, list[i]);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int cluster_ping(const nid_t *nid, int timeout)
{
        if (net_islocal(nid)) {
                return 0;
        } else {
                return cluster_rpc_ping(nid, timeout);
        }
}

int cluster_analysis(const nid_t *nid, const str_t *key, str_t *value)
{
        if (net_islocal(nid)) {
                UNIMPLEMENTED(__DUMP__);
                //return longtask_drop(id);
                return 0;
        } else {
                return cluster_rpc_analysis(nid, key, value);
        }
}

int cluster_getinfo(const nid_t *nid, const nid_t *peer, ynet_net_info_t *info)
{
        uint32_t len;
        if (net_islocal(nid)) {
                len = MAX_BUF_LEN;
                return netable_getinfo(peer, info, &len);
        } else {
                return cluster_rpc_getinfo(nid, peer, info);
        }
}

int cluster_online(const nid_t *peer)
{
        int ret;
        uint32_t len;
        char buf[MAX_BUF_LEN];//, _buf[MAX_BUF_LEN];
        ynet_net_info_t *info;

        info = (void *)buf;
        len = MAX_BUF_LEN;
        ret = netable_getinfo(peer, info, &len);
        if (ret == 0)
                return 1;

        ret = cluster_rpc_getinfo(net_getadmin(), peer, info);
        if (ret == 0)
                return 1;

        return 0;
}

int cluster_set_deleting(int deleting)
{
        int ret;
        nodestat_t stat;
        nodedfree_t *dfree;
        char dfinfo[MAX_INFO_LEN];
        uint32_t mode, admin_uptime;
        uint32_t dflen;

        if (deleting)
                mode = __NODE_STAT_DELETING__;
        else
                mode = 0;

        ret = ymalloc((void **)&dfree, sizeof(*dfree));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = node_stat(&stat, dfree);
        if (unlikely(ret))
                GOTO(err_free, ret);

        dflen = MAX_INFO_LEN;
        dfinfo_encode(dfinfo, &dflen, dfree);

        yfree((void **)&dfree);

        if ((stat.status & __NODE_STAT_DELETING__) == mode)
                goto out;

        stat.status |= __NODE_STAT_DELETING__;

        ret = dispatch_heartbeat(__node__.name, dfinfo, &stat, __node__.status, &admin_uptime);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        node_set_deleting(deleting);

out:
        return 0;
err_free:
        yfree((void **)&dfree);
err_ret:
        return ret;
}

int cluster_storage_area_is_null(const char *site_name)
{
        if ((strcmp(site_name, STORAGE_AREA_VALUE_NULL) == 0) || (strlen(site_name) == 0)) {
                return 1;
        } else {
                return 0;
        }
}
